package dai.samples.batch.process;

import dai.samples.batch.entity.BatchEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectMarshaller;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

/**
 * 任务执行流程配置
 * @author daify
 * @date 2020-11-15
 */
@Slf4j
@Configuration
public class ProcessJobConfig {

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;


    @Autowired
    PlatformTransactionManager transactionManager;

    /**
     * 任务的结束，默认会执行success步骤，如果出现Failed则直接失败
     * @param jobRepository
     * @return
     */
    @Bean("processJobEnd")
    public Job processJobEnd(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("processJobEnd")
                .repository(jobRepository)
                .start(processStep())
                .next(jobExecutionDecider())
                .on("*").to(success())
                .from(jobExecutionDecider())
                .on("FAILED").fail()
                .end()
                .build();
    }



    /**
     * 任务的流程控制
     * @param jobRepository
     * @return
     */
    @Bean("processJob")
    public Job processJob(JobRepository jobRepository) {
        return this.jobBuilderFactory.get("processJob")
                .repository(jobRepository)
                .start(processStep())
                .next(jobExecutionDecider())
                .on("*").to(success())
                .from(jobExecutionDecider())
                .on("FAILED").to(failed())
                .end()
                .build();
    }

    /**
     * 成功时执行的方法
     * @return
     */
    @Bean("processStepSuccess")
    public Step success() {
        return this.stepBuilderFactory.get("successStep")
                .<BatchEntity, BatchEntity>chunk(1)
                .<BatchEntity, BatchEntity>chunk(1)
                .reader(itemReader())
                .processor(successProcessor())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }

    /**
     * 失败时执行的方法
     * @return
     */
    @Bean("processStepFailed")
    public Step failed() {
        return this.stepBuilderFactory.get("failedStep")
                .<BatchEntity, BatchEntity>chunk(1)
                .<BatchEntity, BatchEntity>chunk(1)
                .reader(itemReader())
                .processor(failedProcessor())
                .writer(itemWriter())
                .allowStartIfComplete(true)
                .build();
    }

    /**
     * 失败时执行的处理方法
     * @return
     */
    private ItemProcessor<BatchEntity, ? extends BatchEntity> failedProcessor() {
        return (ItemProcessor<BatchEntity, BatchEntity>) entity -> {
            System.out.println("+-------failedProcessor----------");
            String fullName = entity.getFirstName() + " failed " +
                    entity.getLastName();
            entity.setFullName(fullName);
            System.out.println(fullName + ":" + entity.getAge());
            return entity;
        };
    }

    /**
     * 成功时执行的处理方法
     * @return
     */
    private ItemProcessor<BatchEntity, ? extends BatchEntity> successProcessor() {
        return (ItemProcessor<BatchEntity, BatchEntity>) entity -> {
            System.out.println("+-------successProcessor----------");
            String fullName = entity.getFirstName() + " success " +
                    entity.getLastName();
            entity.setFullName(fullName);
            System.out.println(fullName + ":" + entity.getAge());
            return entity;
        };
    }



    @Bean("processStep")
    public Step processStep() {
        return this.stepBuilderFactory.get("processStep")
                .<BatchEntity, BatchEntity>chunk(1)
                .reader(itemReader())
                .processor(getProcessor())
                .writer(itemWriter())
                .faultTolerant()
                .build();
    }

    /**
     * 决策流、条件结果
     * @return
     */
    @Bean
    public JobExecutionDecider jobExecutionDecider() {
        return new JobExecutionDecider() {
            @Override
            public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
                // 根据传递参数的类型返回对应类型
                String type = jobExecution.getJobParameters().getString("type");
                return new FlowExecutionStatus(type);
            }
        };
    }

    /**
     * JSON写数据
     * @return
     */
    public ItemWriter<BatchEntity> itemWriter() {
        String name = "processJob-" + System.currentTimeMillis();
        String patch = "target/test-outputs/" + name + ".json";
        return new JsonFileItemWriterBuilder<BatchEntity>()
                .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                .resource(new FileSystemResource(patch))
                .name(name)
                .build();
    }

    /**
     * JSON读数据
     * @return
     */
    public ItemReader<BatchEntity> itemReader() {
        return new JsonItemReaderBuilder<BatchEntity>()
                .jsonObjectReader(new JacksonJsonObjectReader<>(BatchEntity.class))
                .resource(new ClassPathResource("data/batchJob.json"))
                .name("batchJobReader")
                .build();
    }

    public ItemProcessor<BatchEntity, ? extends BatchEntity> getProcessor() {
        return new ChangeNameProcessor();
    }


    /**
     * 处理过程
     */
    class ChangeNameProcessor implements ItemProcessor<BatchEntity, BatchEntity> {

        @Override
        public BatchEntity process(BatchEntity person) {
            String fullName =person.getFirstName() + " " +
                    person.getLastName();
            person.setFullName(fullName);
            System.out.println(fullName + ":" + person.getAge());
            return person;
        }
    }

}
